<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2018/5/23 0023
 * Time: 下午 8:53
 */

class Pool{

    protected  $table;
    protected  $mpid; //进程id

    public  function  __construct()
    {

            $this->mpid=getmypid();
            $this->table=$this->create_table(); //将进程pid跟任务内容进行绑定
            $this->run();
            $this->signal();//信号监听
    }


    //获取任务,创建进程
    protected  function  run(){
        $data=[
            '990979940@qq.com',
            '213123@qq.com',
            '457567657@qq.com'
        ];

        foreach ($data as $v){
            $process=$this->create_process();
            //pid=>990979940@qq.com  pid=>任务内容
             $this->table->set($process->pid,['data'=>$v]);
             $process->write($v);
        }

       /*foreach($this->table as $row)
        {
            var_dump($row);
        }*/
    }

    //创建一个内存表
    protected  function  create_table(){
        $table=new swoole\table(1024);
        $table->column('attempts',swoole\table::TYPE_INT,2);
        $table->column('data',swoole\table::TYPE_STRING,64);
        $table->create();
        return $table;
    }


    /**
     * 创建子进程
     */
    protected  function  create_process(){
        $process=new swoole\Process([$this,'callback_function']);
        $process->start(); //启动子进程
          //异步读取管道消息
          swoole_event::add($process->pipe,function () use($process){
                $data=json_decode($process->read(),true);
                if($data['status']==false);
                swoole_event::del($process->pipe);
                echo '子进程发送的内容:'.PHP_EOL;
                var_dump($data);
          });

          return $process;
    }

    //子进程业务处理逻辑
    public  function callback_function($worker){
        $res=$worker->read();
        $rand=mt_rand(1,2);

        for ($i=0;$i<20;$i++){
            echo '子进程在执行'.$i.PHP_EOL;
            $this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程
            sleep(1);
        }

        if($rand==1){
            throw  new Exception('出现异常了');
        }

       /*//执行发短信的逻辑,正常的逻辑
        $res=false;
        if($res==false){
            $data=[
                'status'=>false,'data'=>$res,'msg'=>'任务执行成功'
            ];
            $worker->write(json_encode($data)); //写入到主进程
            return;
        }*/

        $data=[
            'status'=>true,'data'=>$res,'msg'=>'任务执行成功'
        ];
        $worker->write(json_encode($data)); //写入到主进程

        $this->table->del($worker->pid); //删除关系绑定的记录

    }

    /**
     *
     *检测主进程是否存在
     * @param $mpid 父进程pid
     *
     */
    public  function  checkPid($worker,$mpid){

         if(!swoole\Process::kill($mpid,0)){

             //等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了
             //子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭

             $msg="主进程已经退出,工作进程{$worker->pid}退出";
             echo $msg.PHP_EOL;
             $worker->exit();//子进程退出
         }
    }

    //捕获子进程结束时的信号,回收子进程
    public  function  signal(){

        swoole_process::signal(SIGCHLD, function($sig) {
            //必须为false，非阻塞模式
            while($ret =  swoole_process::wait(false)) {
                   if($ret['code']>0){ //出现非0状态码
                        $pid=$ret['pid'] ;//子进程结束pid
                        $data=$this->table->get($pid);
                       $this->table->del($pid); //删除之前内容
                       //判断重试次数
                       if($data['attempts']==0){
                            $process=$this->create_process(); //重新拉起进程,会产生新的pid
                            $this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程
                            $process->write($data['data']);
                            return;
                        }
                        //file_put_contents(); 记录日志|发送邮件|异常队列当中
                   }
            }
           /* foreach($this->table as $row) {
                  var_dump($row);
            }*/

        });
    }

}

new Pool();